/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.test.checkpointing.utils;

import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackendLoader;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.testutils.migration.MigrationVersion;
import org.apache.flink.util.Collector;

import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.Arrays;
import java.util.Collection;

import static org.junit.Assert.assertEquals;

/**
 * Migration ITCases for a stateful job. The tests are parameterized to cover migrating for multiple
 * previous Flink versions, as well as for different state backends.
 */
@RunWith(Parameterized.class)
public class LegacyStatefulJobSavepointMigrationITCase extends SavepointMigrationTestBase {

    private static final int NUM_SOURCE_ELEMENTS = 4;

    @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
    public static Collection<Tuple2<MigrationVersion, String>> parameters() {
        return Arrays.asList(
                Tuple2.of(MigrationVersion.v1_4, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
                Tuple2.of(MigrationVersion.v1_4, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
    }

    /**
     * TODO to generate savepoints for a specific Flink version / backend type, TODO change these
     * values accordingly, e.g. to generate for 1.3 with RocksDB, TODO set as
     * (MigrationVersion.v1_3, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME) TODO Note: You should
     * generate the savepoint based on the release branch instead of the master.
     */
    private final MigrationVersion flinkGenerateSavepointVersion = MigrationVersion.v1_4;

    private final String flinkGenerateSavepointBackendType =
            StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME;

    private final MigrationVersion testMigrateVersion;
    private final String testStateBackend;

    public LegacyStatefulJobSavepointMigrationITCase(
            Tuple2<MigrationVersion, String> testMigrateVersionAndBackend) throws Exception {
        this.testMigrateVersion = testMigrateVersionAndBackend.f0;
        this.testStateBackend = testMigrateVersionAndBackend.f1;
    }

    /** Manually run this to write binary snapshot data. */
    @Test
    @Ignore
    public void writeSavepoint() throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        switch (flinkGenerateSavepointBackendType) {
            case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
                env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
                break;
            case StateBackendLoader.MEMORY_STATE_BACKEND_NAME:
                env.setStateBackend(new MemoryStateBackend());
                break;
            default:
                throw new UnsupportedOperationException();
        }

        env.enableCheckpointing(500);
        env.setParallelism(4);
        env.setMaxParallelism(4);

        env.addSource(new LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS))
                .setMaxParallelism(1)
                .uid("LegacyCheckpointedSource")
                .flatMap(new LegacyCheckpointedFlatMap())
                .startNewChain()
                .uid("LegacyCheckpointedFlatMap")
                .keyBy(0)
                .flatMap(new LegacyCheckpointedFlatMapWithKeyedState())
                .startNewChain()
                .uid("LegacyCheckpointedFlatMapWithKeyedState")
                .keyBy(0)
                .flatMap(new KeyedStateSettingFlatMap())
                .startNewChain()
                .uid("KeyedStateSettingFlatMap")
                .keyBy(0)
                .transform(
                        "custom_operator",
                        new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
                        new CheckpointedUdfOperator(new LegacyCheckpointedFlatMapWithKeyedState()))
                .uid("LegacyCheckpointedOperator")
                .keyBy(0)
                .transform(
                        "timely_stateful_operator",
                        new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
                        new TimelyStatefulOperator())
                .uid("TimelyStatefulOperator")
                .addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>());

        executeAndSavepoint(
                env,
                "src/test/resources/"
                        + getSavepointPath(
                                flinkGenerateSavepointVersion, flinkGenerateSavepointBackendType),
                new Tuple2<>(
                        AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
    }

    @Test
    public void testSavepointRestore() throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRestartStrategy(RestartStrategies.noRestart());

        switch (testStateBackend) {
            case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
                env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
                break;
            case StateBackendLoader.MEMORY_STATE_BACKEND_NAME:
                env.setStateBackend(new MemoryStateBackend());
                break;
            default:
                throw new UnsupportedOperationException();
        }

        env.enableCheckpointing(500);
        env.setParallelism(4);
        env.setMaxParallelism(4);

        env.addSource(new CheckingRestoringSource(NUM_SOURCE_ELEMENTS))
                .setMaxParallelism(1)
                .uid("LegacyCheckpointedSource")
                .flatMap(new CheckingRestoringFlatMap())
                .startNewChain()
                .uid("LegacyCheckpointedFlatMap")
                .keyBy(0)
                .flatMap(new CheckingRestoringFlatMapWithKeyedState())
                .startNewChain()
                .uid("LegacyCheckpointedFlatMapWithKeyedState")
                .keyBy(0)
                .flatMap(new CheckingKeyedStateFlatMap())
                .startNewChain()
                .uid("KeyedStateSettingFlatMap")
                .keyBy(0)
                .transform(
                        "custom_operator",
                        new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
                        new CheckingRestoringUdfOperator(
                                new CheckingRestoringFlatMapWithKeyedStateInOperator()))
                .uid("LegacyCheckpointedOperator")
                .keyBy(0)
                .transform(
                        "timely_stateful_operator",
                        new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
                        new CheckingTimelyStatefulOperator())
                .uid("TimelyStatefulOperator")
                .addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>());

        restoreAndExecute(
                env,
                getResourceFilename(getSavepointPath(testMigrateVersion, testStateBackend)),
                new Tuple2<>(CheckingRestoringSource.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 1),
                new Tuple2<>(
                        CheckingRestoringFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
                        NUM_SOURCE_ELEMENTS),
                new Tuple2<>(
                        CheckingRestoringFlatMapWithKeyedState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
                        NUM_SOURCE_ELEMENTS),
                new Tuple2<>(
                        CheckingKeyedStateFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
                        NUM_SOURCE_ELEMENTS),
                new Tuple2<>(
                        CheckingRestoringUdfOperator.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
                        NUM_SOURCE_ELEMENTS),
                new Tuple2<>(
                        CheckingRestoringFlatMapWithKeyedStateInOperator
                                .SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
                        NUM_SOURCE_ELEMENTS),
                new Tuple2<>(
                        CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR,
                        NUM_SOURCE_ELEMENTS),
                new Tuple2<>(
                        CheckingTimelyStatefulOperator.SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR,
                        NUM_SOURCE_ELEMENTS),
                new Tuple2<>(
                        CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR,
                        NUM_SOURCE_ELEMENTS),
                new Tuple2<>(
                        AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
    }

    private String getSavepointPath(MigrationVersion savepointVersion, String backendType) {
        switch (backendType) {
            case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
                return "stateful-udf-migration-itcase-flink"
                        + savepointVersion
                        + "-rocksdb-savepoint";
            case StateBackendLoader.MEMORY_STATE_BACKEND_NAME:
                return "stateful-udf-migration-itcase-flink" + savepointVersion + "-savepoint";
            default:
                throw new UnsupportedOperationException();
        }
    }

    private static class LegacyCheckpointedSource implements SourceFunction<Tuple2<Long, Long>> {

        public static String checkpointedString = "Here be dragons!";

        private static final long serialVersionUID = 1L;

        private volatile boolean isRunning = true;

        private final int numElements;

        public LegacyCheckpointedSource(int numElements) {
            this.numElements = numElements;
        }

        @Override
        public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {

            ctx.emitWatermark(new Watermark(0));

            synchronized (ctx.getCheckpointLock()) {
                for (long i = 0; i < numElements; i++) {
                    ctx.collect(new Tuple2<>(i, i));
                }
            }

            // don't emit a final watermark so that we don't trigger the registered event-time
            // timers
            while (isRunning) {
                Thread.sleep(20);
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
        }
    }

    private static class CheckingRestoringSource extends RichSourceFunction<Tuple2<Long, Long>> {

        private static final long serialVersionUID = 1L;

        public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR =
                CheckingRestoringSource.class + "_RESTORE_CHECK";

        private volatile boolean isRunning = true;

        private final int numElements;

        private String restoredState;

        public CheckingRestoringSource(int numElements) {
            this.numElements = numElements;
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);

            getRuntimeContext()
                    .addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter());
        }

        @Override
        public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
            getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);

            // immediately trigger any set timers
            ctx.emitWatermark(new Watermark(1000));

            synchronized (ctx.getCheckpointLock()) {
                for (long i = 0; i < numElements; i++) {
                    ctx.collect(new Tuple2<>(i, i));
                }
            }

            while (isRunning) {
                Thread.sleep(20);
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
        }
    }

    private static class LegacyCheckpointedFlatMap
            extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

        private static final long serialVersionUID = 1L;

        public static Tuple2<String, Long> checkpointedTuple = new Tuple2<>("hello", 42L);

        @Override
        public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out)
                throws Exception {
            out.collect(value);
        }
    }

    private static class CheckingRestoringFlatMap
            extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

        private static final long serialVersionUID = 1L;

        public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR =
                CheckingRestoringFlatMap.class + "_RESTORE_CHECK";

        private transient Tuple2<String, Long> restoredState;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);

            getRuntimeContext()
                    .addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter());
        }

        @Override
        public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out)
                throws Exception {
            out.collect(value);

            getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
        }
    }

    private static class LegacyCheckpointedFlatMapWithKeyedState
            extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

        private static final long serialVersionUID = 1L;

        public static Tuple2<String, Long> checkpointedTuple = new Tuple2<>("hello", 42L);

        private final ValueStateDescriptor<Long> stateDescriptor =
                new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);

        @Override
        public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out)
                throws Exception {
            out.collect(value);

            getRuntimeContext().getState(stateDescriptor).update(value.f1);

            assertEquals(value.f1, getRuntimeContext().getState(stateDescriptor).value());
        }
    }

    private static class CheckingRestoringFlatMapWithKeyedState
            extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

        private static final long serialVersionUID = 1L;

        public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR =
                CheckingRestoringFlatMapWithKeyedState.class + "_RESTORE_CHECK";

        private transient Tuple2<String, Long> restoredState;

        private final ValueStateDescriptor<Long> stateDescriptor =
                new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);

            getRuntimeContext()
                    .addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter());
        }

        @Override
        public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out)
                throws Exception {
            out.collect(value);

            ValueState<Long> state = getRuntimeContext().getState(stateDescriptor);
            if (state == null) {
                throw new RuntimeException("Missing key value state for " + value);
            }

            assertEquals(value.f1, state.value());
            getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
        }
    }

    private static class CheckingRestoringFlatMapWithKeyedStateInOperator
            extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

        private static final long serialVersionUID = 1L;

        public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR =
                CheckingRestoringFlatMapWithKeyedStateInOperator.class + "_RESTORE_CHECK";

        private transient Tuple2<String, Long> restoredState;

        private final ValueStateDescriptor<Long> stateDescriptor =
                new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);

            getRuntimeContext()
                    .addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter());
        }

        @Override
        public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out)
                throws Exception {
            out.collect(value);

            ValueState<Long> state = getRuntimeContext().getState(stateDescriptor);
            if (state == null) {
                throw new RuntimeException("Missing key value state for " + value);
            }

            assertEquals(value.f1, state.value());
            getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
        }
    }

    private static class KeyedStateSettingFlatMap
            extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

        private static final long serialVersionUID = 1L;

        private final ValueStateDescriptor<Long> stateDescriptor =
                new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);

        @Override
        public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out)
                throws Exception {
            out.collect(value);

            getRuntimeContext().getState(stateDescriptor).update(value.f1);
        }
    }

    private static class CheckingKeyedStateFlatMap
            extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

        private static final long serialVersionUID = 1L;

        public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR =
                CheckingKeyedStateFlatMap.class + "_RESTORE_CHECK";

        private final ValueStateDescriptor<Long> stateDescriptor =
                new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);

            getRuntimeContext()
                    .addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter());
        }

        @Override
        public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out)
                throws Exception {
            out.collect(value);

            ValueState<Long> state = getRuntimeContext().getState(stateDescriptor);
            if (state == null) {
                throw new RuntimeException("Missing key value state for " + value);
            }

            assertEquals(value.f1, state.value());
            getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
        }
    }

    private static class CheckpointedUdfOperator
            extends AbstractUdfStreamOperator<
                    Tuple2<Long, Long>, FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>>
            implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> {
        private static final long serialVersionUID = 1L;

        private static final String CHECKPOINTED_STRING = "Oh my, that's nice!";

        public CheckpointedUdfOperator(
                FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> userFunction) {
            super(userFunction);
        }

        @Override
        public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception {
            userFunction.flatMap(element.getValue(), new TimestampedCollector<>(output));
        }

        @Override
        public void processWatermark(Watermark mark) throws Exception {
            output.emitWatermark(mark);
        }
    }

    private static class CheckingRestoringUdfOperator
            extends AbstractUdfStreamOperator<
                    Tuple2<Long, Long>, FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>>
            implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> {

        private static final long serialVersionUID = 1L;

        public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR =
                CheckingRestoringUdfOperator.class + "_RESTORE_CHECK";

        private String restoredState;

        public CheckingRestoringUdfOperator(
                FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> userFunction) {
            super(userFunction);
        }

        @Override
        public void open() throws Exception {
            super.open();

            getRuntimeContext()
                    .addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter());
        }

        @Override
        public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception {
            userFunction.flatMap(element.getValue(), new TimestampedCollector<>(output));
            getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
        }

        @Override
        public void processWatermark(Watermark mark) throws Exception {
            output.emitWatermark(mark);
        }
    }

    private static class TimelyStatefulOperator extends AbstractStreamOperator<Tuple2<Long, Long>>
            implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>>,
                    Triggerable<Long, Long> {
        private static final long serialVersionUID = 1L;

        private final ValueStateDescriptor<Long> stateDescriptor =
                new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);

        private transient InternalTimerService<Long> timerService;

        @Override
        public void open() throws Exception {
            super.open();

            timerService = getInternalTimerService("timer", LongSerializer.INSTANCE, this);
        }

        @Override
        public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception {
            ValueState<Long> state =
                    getKeyedStateBackend()
                            .getPartitionedState(
                                    element.getValue().f0,
                                    LongSerializer.INSTANCE,
                                    stateDescriptor);

            state.update(element.getValue().f1);

            timerService.registerEventTimeTimer(
                    element.getValue().f0, timerService.currentWatermark() + 10);
            timerService.registerProcessingTimeTimer(
                    element.getValue().f0, timerService.currentProcessingTime() + 30_000);

            output.collect(element);
        }

        @Override
        public void onEventTime(InternalTimer<Long, Long> timer) throws Exception {}

        @Override
        public void onProcessingTime(InternalTimer<Long, Long> timer) throws Exception {}

        @Override
        public void processWatermark(Watermark mark) throws Exception {
            output.emitWatermark(mark);
        }
    }

    private static class CheckingTimelyStatefulOperator
            extends AbstractStreamOperator<Tuple2<Long, Long>>
            implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>>,
                    Triggerable<Long, Long> {
        private static final long serialVersionUID = 1L;

        public static final String SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR =
                CheckingTimelyStatefulOperator.class + "_PROCESS_CHECKS";
        public static final String SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR =
                CheckingTimelyStatefulOperator.class + "_ET_CHECKS";
        public static final String SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR =
                CheckingTimelyStatefulOperator.class + "_PT_CHECKS";

        private final ValueStateDescriptor<Long> stateDescriptor =
                new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);

        private transient InternalTimerService<Long> timerService;

        @Override
        public void open() throws Exception {
            super.open();

            timerService = getInternalTimerService("timer", LongSerializer.INSTANCE, this);

            getRuntimeContext()
                    .addAccumulator(SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR, new IntCounter());
            getRuntimeContext()
                    .addAccumulator(SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR, new IntCounter());
            getRuntimeContext()
                    .addAccumulator(SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR, new IntCounter());
        }

        @Override
        public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception {
            ValueState<Long> state =
                    getKeyedStateBackend()
                            .getPartitionedState(
                                    element.getValue().f0,
                                    LongSerializer.INSTANCE,
                                    stateDescriptor);

            assertEquals(state.value(), element.getValue().f1);
            getRuntimeContext().getAccumulator(SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR).add(1);

            output.collect(element);
        }

        @Override
        public void onEventTime(InternalTimer<Long, Long> timer) throws Exception {
            ValueState<Long> state =
                    getKeyedStateBackend()
                            .getPartitionedState(
                                    timer.getNamespace(), LongSerializer.INSTANCE, stateDescriptor);

            assertEquals(state.value(), timer.getNamespace());
            getRuntimeContext().getAccumulator(SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR).add(1);
        }

        @Override
        public void onProcessingTime(InternalTimer<Long, Long> timer) throws Exception {
            ValueState<Long> state =
                    getKeyedStateBackend()
                            .getPartitionedState(
                                    timer.getNamespace(), LongSerializer.INSTANCE, stateDescriptor);

            assertEquals(state.value(), timer.getNamespace());
            getRuntimeContext().getAccumulator(SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR).add(1);
        }
    }

    private static class AccumulatorCountingSink<T> extends RichSinkFunction<T> {
        private static final long serialVersionUID = 1L;

        public static final String NUM_ELEMENTS_ACCUMULATOR =
                AccumulatorCountingSink.class + "_NUM_ELEMENTS";

        int count = 0;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);

            getRuntimeContext().addAccumulator(NUM_ELEMENTS_ACCUMULATOR, new IntCounter());
        }

        @Override
        public void invoke(T value) throws Exception {
            count++;
            getRuntimeContext().getAccumulator(NUM_ELEMENTS_ACCUMULATOR).add(1);
        }
    }
}
